paho-mqttをpytestでmock化してみる
はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回は、paho-mqttをpytestでmock化する実装に取り組みたいと思います。
前提
paho-mqttはMQTTブローカーに接続してmessageをpublishし、topicをsubscribeしてpublishされたmessageを受信できるようにするclient classです。
pytestでtestコードを実装しようとした際にpahoのサンプルコードやpytest-mqttというライブラリで検証しましたが、今回の実装に沿ったtest検証はできなかったため、pytest-mockを用いた実装を行いました。
実際に実装した処理がMQTT Brokerを通して動かなければ意味がないため、その検証用にmosquittoをMac OSにinstallします。オープンソースのMQTT Brokerはいくつかありますが、今回は以前使用しておりinstallが容易なmosquittoを選択しています。
以下のコマンドでmosquittoをinstallします。
brew install mosquitto
install後は、mosquittoが正常に立ち上がるかを確認します。
brew services start mosquitto brew services list
立ち上がった場合は以下のようにstatusがstartedになります。
(base) @blog_code % brew services list Name Status User File dbus none mosquitto started kasama~/Library/LaunchAgents/homebrew.mxcl.mosquitto.plist mysql@5.7 none unbound none (base) kasama@blog_code %
停止したい場合はstopコマンドを実行します。
brew services stop mosquitto
実装
それでは実装になります。実装の構成は以下になります。pythonでMQTT送受信
記事を参考にmain.pyを実装し、main.pyをpytestでtestします。
(31-paho-mqtt-pytest-mocking-py3.12) @31_paho_mqtt_pytest_mocking % tree . ├── README.md ├── main.py ├── pyproject.toml └── tests ├── conftest.py └── test_main.py 4 directories, 9 files (31-paho-mqtt-pytest-mocking-py3.12) @31_paho_mqtt_pytest_mocking %
まずは必要なライブラリをインストールします。
pip install paho-mqtt pytest pytest-mock pytest-cov
main.py
import json import paho.mqtt.client as mqtt # MQTTのライブラリをインポート import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # ブローカーに接続できたときの処理 def on_connect(client, userdata, flag, rc): logger.info("Connected with return code " + str(rc)) # 接続できた旨表示 logger.info("subscribe drone/#") client.subscribe("drone/#") # subするトピックを設定 # ブローカーが切断したときの処理 def on_disconnect(client, userdata, rc): if rc != 0: logger.warning("Unexpected disconnection.") # publishが完了したときの処理 def on_publish(client, userdata, mid): logger.info(f"published: client:{client}, userdata: {userdata}, mid: {mid}") def publish_msg(client, topic, msg): logger.info(f"publish client:{client},topic:{topic},msg:{msg}") client.publish(topic, msg) # トピック名とメッセージを決めて送信 # メッセージが届いたときの処理 def on_message(client, userdata, msg): # msg.topicにトピック名が,msg.payloadに届いたデータ本体が入っている payload = json.loads(msg.payload.decode()) topic = str(msg.topic) logger.info( f"Received payload:{str(payload)}, topic: {topic}, with QoS: {str(msg.qos)}, userdata: {str(userdata)}" ) if "drone/" in topic: dron_num = topic.split("/")[1] if payload["status"] == "running": userdata.add(topic) publish_msg(client, f"feed/drone/{dron_num}", "start-runnning") elif payload["status"] == "stopped": userdata.discard(topic) publish_msg(client, f"feed/drone/{dron_num}", "stopped") def main(): # MQTTの接続設定 active_drones = set() client = mqtt.Client(userdata=active_drones) # クラスのインスタンス(実体)の作成 client.on_connect = on_connect # 接続時のコールバック関数を登録 client.on_disconnect = on_disconnect # 切断時のコールバックを登録 client.on_message = on_message # メッセージ到着時のコールバック client.on_publish = on_publish # メッセージ送信時のコールバック client.connect("localhost", 1883, 60) # 接続先は自分自身 client.loop_forever() # 永久ループして待ち続ける if __name__ == "__main__": main()
- on_connect関数で
client.subscribe("drone/#")
と記載することで、drone/
ではじまる全てのTOPICに対してSubscribeします。 - on_message関数でTOPICが
drone/
である際にfeed/drone/
TOPICにpublishするような実装とします。 - userdataでは、statusがrunning状態のdroneを保持する処理を実装しています。
conftest.py
import pytest import paho.mqtt.client as mqtt def pytest_runtest_setup(item): print(f"\n--------Start----{item.name}---") def pytest_runtest_teardown(item): print(f"\n--------End----{item.name}-----") # MQTTクライアントのモックを提供するフィクスチャ @pytest.fixture def mock_client(mocker): return mocker.Mock(spec=mqtt.Client) # active_dronesをフィクスチャとして定義 @pytest.fixture(scope="module") def active_drones(): return set()
conftest.pyはpytestがテストを検出して実行する際に参照されるファイルです。pytestはテストを実行する前に自動的にconftest.pyを読み込んで、その中に定義されているfixtureやhook関数を利用できるようにします。conftest.pyの中で定義された内容は、特定のイベントが発生したときに実行されます。
- fixture: テスト関数にfixture名が引数として渡されたときに呼び出されます。fixtureのscope(
function
,class
,module
,session
)によって、そのライフサイクル(再利用される期間)が決定されます。 - hook関数(
pytest_runtest_setup
,pytest_runtest_teardown
など): pytestの特定の実行フェーズ(テスト前準備やテスト後始末など)で自動的に呼び出されます。 conftest.py自体はテスト実行プロセスの初期化中に読み込まれるが、その中のfixtureやhook関数は上記のような特定の状況やテストごとの実行フェーズに従って実行されます。 テスト関数が実行される直前にはpytest_runtest_setup
が呼ばれ、テスト関数の実行が完了した直後にはpytest_runtest_teardown
が呼ばれます。 - mock_clientでは
mqtt.Client
をmock化し、active_drones
はmoduleごとに共通のset
を使用したいために定義しています。 -
Python(pytest)でテスト書くならfixture,conftest,parametrizeを理解すると世界が一気に変わる
test_main.py
import json import sys from pathlib import Path # ルートディレクトリへのパスを追加 current_dir = Path(__file__).parent root_dir = current_dir.parent sys.path.append(str(root_dir)) import pytest import main # sub.pyの内容をインポート def test_on_connect(mock_client): main.on_connect(mock_client, None, None, 0) mock_client.subscribe.assert_called_with("drone/#") def test_on_disconnect(mocker, mock_client): # logger.warningのモックを作成 mock_warning = mocker.patch("main.logger.warning") # 正常な切断シナリオ(rc=0) main.on_disconnect(mock_client, None, 0) mock_warning.assert_not_called() # 通常の切断ではwarningが記録されないことを確認 mock_warning.reset_mock() # 異常な切断シナリオ(rc!=0) main.on_disconnect(mock_client, None, 1) # 異常切断ではwarningが記録されることを確認 mock_warning.assert_called_once_with("Unexpected disconnection.") @pytest.mark.parametrize( "topic, status, expected_call", [ ("drone/001", "running", "start-runnning"), ("drone/002", "stopped", "stopped"), ("drone/003", "running", "start-runnning"), ("drone/004", "stopped", "stopped"), ], ) def test_on_message(mock_client, mocker, active_drones, topic, status, expected_call): mock_msg = mocker.Mock() payload_json = json.dumps({"status": status}) mock_msg.topic = topic mock_msg.payload = payload_json.encode() mock_msg.qos = 1 main.on_message(mock_client, active_drones, mock_msg) mock_client.publish.assert_called_once_with(f"feed/{topic}", expected_call) mock_client.publish.reset_mock() def test_main(mocker, mock_client): mocker.patch("main.mqtt.Client", return_value=mock_client) main.main() mock_client.connect.assert_called_with("localhost", 1883, 60) mock_client.loop_forever.assert_called()
test_main.py
で実際のテスト内容を実装しています。
- 最初にルートディレクトリのpathを追加しているのは、
main.py
をimportするためです。test_main.py
が存在するカレントディレクトリ(tests)配下では、import errorとなってしまうため、main.py
が存在するルートディレクトリへのpathを追加しています。 test_on_connect
関数では、mock化したmqtt.Client
を用いてsubscribe関数がdrone/#"
を引数として実行されたか試験します。test_on_disconnect
関数では、mock化したmqtt.Client
を用いて切断処理の正常/異常を試験しています。test_on_message
関数では、mock化したmqtt.Client
、msg
、データを用いて、publish
関数が想定の引数で呼ばれているか、active_drones
の値は想定通りかを試験しています。parametrize
は1つのテスト関数で色々なパターンのデータでテストをしたい場合に使用します。test_main
関数では、mock化したmain.mqtt.Client
を用いてconnect
やloop_forever
が実行されたかを試験します。
実行
pytest実行前にmosquittoを用いて実装が動作するかをテストします。mosquittoのstatusはstarted
であることを確認し、main.py
を実行します。
(31-paho-mqtt-pytest-mocking-py3.12) @31_paho_mqtt_pytest_mocking % python main.py INFO:__main__:Connected with return code 0 INFO:__main__:subscribe drone/#
この状態で以下のコマンドを実行し、brokerにpublishします。
mosquitto_pub -h localhost -t "drone/001" -m '{"status": "running"}' mosquitto_pub -h localhost -t "drone/002" -m '{"status": "running"}' mosquitto_pub -h localhost -t "drone/001" -m '{"status": "stopped"}'
正常にsubscribe,publishできていること、userdataがadd
、discard
処理ができていることを確認できました。
次にpytestを実行します。私の場合poetryでプロジェクト管理をしており、tomlファイル上でpytest実行時のlogger出力を設定していますが、その設定が無い場合でも標準出力やlogger出力を表示させるために以下のoptionを付与したコマンドを実行します。
pytest -o log_cli=true -o log_cli_level=DEBUG -s
-o log_cli
オプションは、true
に設定することによりpytestのログ出力をCLIを通じて実行されているConsole(Terminal)に直接表示させる設定です。-o log_cli_level
オプションは、CLIを通じて実行されているConsole(Terminal)へのログ出力レベルを設定するものです。-s
オプションは、テスト中の標準出力、標準エラー出力をコンソール上に表示されることにする設定です。デフォルトでは、pytestはこれらの出力をキャプチャして、テストが失敗したときにのみ表示する設定になっています。
実行結果になります。想定通りtestは成功しました。test_on_message
も4回実行されています。
coverageも取得してみるために、以下コマンドを実行します。
pytest --cov
実行できなかった処理が2つあったため、処理全体のcoverageは98%となりました。
続いて網羅されていない箇所を確認したいため、cov-report
オプションを付与して実行しreportをhtmlとして出力します。
pytest --cov --cov-report=html
index.html
ファイルがhtmlconv
フォルダの中に生成されるためブラウザ上からhtmlを表示します。
網羅されていない2箇所を確認することができました。
最後に
paho-mqttはまだまだ細かい設定があり、ドキュメントも豊富なので引き続き学習していきたいと思いました。